#include "config.h"

#include <time.h>
#include <dirent.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <stdarg.h>
#include <libgen.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "sysy_lib.h"
#include "lichbd_rpc.h"
#include "cache.h"
#include "get_version.h"
#include "cluster.h"
#include "volume.h"
#include "lichstor.h"
#include "stor_root.h"
#include "main_loop.h"
#include "lich_md.h"
#include "rpc_table.h"
#include "md_map.h"
#include "configure.h"
#include "job_dock.h"
#include "chunk.h"
#include "lichbd.h"
#include "lichstor.h"
#include "mem_cache.h"
#include "ynet_rpc.h"
#include "net_global.h"
#include "ylog.h"
#include "dbg.h"

typedef struct {
        buffer_t buf;
        io_t io;
        func1_t func;
        void *arg;
        void *_buf;
        int retry;
        time_t time;
        char pool[MAX_NAME_LEN];
} lichbd_async_ctx_t;

static char __lichbd_root__[MAX_NAME_LEN];
static int __inited__ = 0;
static int __retry__ = 0;

#define __ANALYSIS__

#ifdef  __ANALYSIS__
static void *__analysis_dump(void *arg)
{
        (void) arg;
        DINFO("start...\n");

        while (1) {
                sleep(30);
                DINFO("analysis dump\n");
                analysis_dumpall();
        }

        return NULL;
}

static int __analysis_start()
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta,PTHREAD_CREATE_DETACHED);

        ret = pthread_create(&th, &ta, __analysis_dump,  NULL);
        if (ret == -1) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

#endif

static void __lichbd_path(const char *name, char *path)
{
        if (strcmp(__lichbd_root__, "") == 0) {
                snprintf(path, MAX_NAME_LEN, "%s", name);
        } else {
                snprintf(path, MAX_NAME_LEN, "%s/%s",  __lichbd_root__, name);
        }
}

int lichbd_open(const char *pool, const char *name, lichbd_image_t *image)
{
        int ret, retry = 0;
        fileid_t fileid;
        char path[MAX_NAME_LEN];

        __lichbd_path(name, path);
        DINFO("open %s\n", path);

retry:
        ret = stor_lookup1(pool, path, &fileid);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        *image = fileid;

        return 0;
err_ret:
        return ret;
}

static int __lichbd_mkpool(const char *pool, const char *path, fileid_t *_parentid)
{
        int ret, retry = 0;
        char name[MAX_NAME_LEN], parent[MAX_PATH_LEN];
        fileid_t parentid;

        ret = _path_split2(path, parent, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

retry:
        ret = stor_lookup1(pool, parent, &parentid);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else if (ret == ENOENT) {
                        ret = __lichbd_mkpool(pool, parent, &parentid);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else                
                        GOTO(err_ret, ret);
        }

        DINFO("mkpool %s, %s %s\n", path, parent, name);

        ret = stor_mkpool(&parentid, name, NULL, NULL);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else if (ret == EEXIST) {
                        ret = stor_lookup1(pool, path, &parentid);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else
                        GOTO(err_ret, ret);
        }

        if (_parentid)
                *_parentid = parentid;

        return 0;
err_ret:
        return ret;
}


int lichbd_create(const char *pool, const char *_name, lichbd_image_t *image)
{
        int ret, retry = 0;
        char name[MAX_NAME_LEN], parent[MAX_PATH_LEN], path[MAX_PATH_LEN];
        fileid_t parentid, fileid;

        __lichbd_path(_name, path);

        ret = _path_split2(path, parent, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("create %s, %s %s\n", path, parent, name);

        ret = __lichbd_mkpool(pool, parent, &parentid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

retry:
        ret = stor_mkvol(&parentid, name, NULL, &fileid);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        *image = fileid;

        return 0;
err_ret:
        return ret;
}

int lichbd_truncate(const char *pool, const lichbd_image_t *image, uint64_t size)
{
        int ret, retry = 0;

        DBUG("truncate "CHKID_FORMAT" size %llu\n", CHKID_ARG(image), (LLU)size)

retry:
        ret = stor_truncate(pool, image, size);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int lichbd_lookup(const lichbd_image_t *image, char *pool)
{
        int ret, retry = 0;

retry:
        ret = stor_getpool(image, pool);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        DINFO("lookup "CHKID_FORMAT" pool %s\n", CHKID_ARG(image), pool);

        return 0;
err_ret:
        return ret;
}

int lichbd_stat(const char *pool, const lichbd_image_t *image, struct stat *stbuf)
{
        int ret, retry = 0;

        DINFO("getattr "CHKID_FORMAT"\n", CHKID_ARG(image));

retry:
        ret = stor_getattr(pool, image, stbuf);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int lichbd_localize(const char *pool, const lichbd_image_t *image)
{
        int ret, retry = 0;

        DINFO("localize "CHKID_FORMAT"\n", CHKID_ARG(image));

retry:
        ret = stor_localize(pool, image);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else if (ret == ENOSPC) {
                        //nothing todo;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}


int lichbd_init(const char *root)
{
        int ret, retry = 0;

        //DINFO("init\n");

        if (__inited__ == 0) {
                __inited__ = 1;
                __retry__ = 1;

                analysis_init();

                dbg_info(0);

                ret = env_init_simple("lichbd");
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (root) {
                        strcpy(__lichbd_root__, root);
                } else {
                        snprintf(__lichbd_root__, MAX_NAME_LEN, "/%s", gloconf.lichbd_root);
                }
                
        retry0:
                ret = network_connect_master();
                if (unlikely(ret)) {
                        if (ret == EAGAIN) {
                                USLEEP_RETRY(err_ret, ret, retry0, retry, 50, (100 * 1000));
                        } else
                                GOTO(err_ret, ret);
                }

                ret = stor_init(NULL, -1);
                if (unlikely(ret))
                        GOTO(err_ret, ret);


#ifdef __ANALYSIS__
                ret = __analysis_start();
                if (unlikely(ret))
                        GOTO(err_ret, ret);
#endif

        }

        return 0;
err_ret:
        return ret;
}

int lichbd_init_new()
{
        int ret, retry = 0;

        //DINFO("init\n");

        YASSERT(__inited__ == 0);
        __inited__ = 1;
        __retry__ = 0;

        analysis_init();

        dbg_info(1);

        strcpy(__lichbd_root__, "");
                
retry0:
        ret = network_connect_master();
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry0, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        ret = stor_init(NULL, -1);
        if (unlikely(ret))
                GOTO(err_ret, ret);


#ifdef __ANALYSIS__
        ret = __analysis_start();
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        return 0;
err_ret:
        return ret;
}

typedef struct {
        fileid_t fileid;
        fileid_t snapid;
        fileid_t snapdst;
        size_t size;
        off_t offset;
        int flag;
        func1_t func;
        void *arg;
        void *_buf;
        buffer_t buf;
        buffer_t *buffer;
        int retry;
        time_t time;
        lichbd_ioctx_t *ioctx;
        int localize;
} lichbd_ctx_t;

static void __lichbd_write(void *arg)
{
        int ret;
        lichbd_ctx_t *ctx;

        ctx = arg;

        ANALYSIS_BEGIN(0);
        
retry:
        ret = lichbd_rpc_write(ctx->buffer, ctx->size, ctx->offset, ctx->flag);
        if (unlikely(ret)) {
                ret = _errno(ret);
                if (__retry__ && ctx->retry < 100 && (ret == EAGAIN || ret == ENOSPC)
                    && ((gettime() - ctx->time) < gloconf.rpc_timeout * 4)) {
                        if (ctx->retry > 10) {
                                DWARN("write "CHKID_FORMAT" (%llu, %llu),"
                                      " ret (%d) %s, need retry %u\n",
                                      CHKID_ARG(&ctx->fileid),
                                      (LLU)ctx->offset, (LLU)ctx->size,
                                      ret, strerror(ret), ctx->retry);
                        }

                        ctx->retry++;
                        schedule_sleep("lichbd_write_retry", 1000 * 1000);

                        goto retry;
                } else {
                        DWARN("write cmd(%p, %llu, %llu), ret (%d) %s\n",
                              ctx, (LLU)ctx->offset, (LLU)ctx->size,
                              ret, strerror(ret));
                        GOTO(err_ret, ret);
                }
        }

        ANALYSIS_ASSERT(0, 1000 * 1000 * (_get_rpc_timeout() * 4), NULL);
        
        ret = ctx->size;
        mbuffer_free(&ctx->buf);
        ctx->func(ctx->arg, &ret);
        mem_cache_free(MEM_CACHE_4K, ctx);

        return;
err_ret:
        ANALYSIS_ASSERT(0, 1000 * 1000 * (_get_rpc_timeout() * 4), NULL);
        ret = -ret;
        mbuffer_free(&ctx->buf);
        ctx->func(ctx->arg, &ret);
        mem_cache_free(MEM_CACHE_4K, ctx);
        return;
}

void lichbd_write(lichbd_ioctx_t *ioctx, const buffer_t *buf, size_t size, off_t offset, int flag,
                  func1_t func, void *arg)
{
        int ret, retry = 0;
        lichbd_ctx_t *ctx;

#ifdef HAVE_STATIC_ASSERT
        static_assert(sizeof(*ctx)  < sizeof(mem_cache4k_t), "lichbd_ctx_t");
#endif

        ctx = mem_cache_calloc(MEM_CACHE_4K, 1);
        ctx->size = size;
        ctx->offset = offset;
        ctx->flag = flag;
        ctx->func = func;
        ctx->arg = arg;
        ctx->retry = 0;
        ctx->fileid = ioctx->fileid;
        ctx->time = gettime();
        ctx->ioctx = ioctx;
        ctx->buffer = (void *)buf;

        /* for lichbd_write1 mbuffer_free in callback __lichbd_write */
        ret = mbuffer_init(&ctx->buf, 0);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

retry:
        ret = vm_request(ioctx->vm, __lichbd_write, ctx, "lichbd_write");
        if (unlikely(ret)) {
                USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
        }

        return;
err_ret:
        ret = -ret;
        mem_cache_free(MEM_CACHE_4K, ctx);
        func(arg, &ret);
        return;
}

void lichbd_write1(lichbd_ioctx_t *ioctx, const char *buf, size_t size, off_t offset, int flag,
                func1_t func, void *arg)
{
        int ret, retry = 0;
        lichbd_ctx_t *ctx;

#ifdef HAVE_STATIC_ASSERT
        static_assert(sizeof(*ctx)  < sizeof(mem_cache4k_t), "lichbd_ctx_t");
#endif

        ctx = mem_cache_calloc(MEM_CACHE_4K, 1);
        ctx->size = size;
        ctx->offset = offset;
        ctx->flag = flag;
        ctx->func = func;
        ctx->arg = arg;
        ctx->retry = 0;
        ctx->fileid = ioctx->fileid;
        ctx->time = gettime();
        ctx->ioctx = ioctx;

        ret = mbuffer_init(&ctx->buf, 0);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        ret = mbuffer_copy(&ctx->buf, buf, size);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

retry:
        ret = vm_request(ioctx->vm, __lichbd_write, ctx, "lichbd_write1");
        if (unlikely(ret)) {
                USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
        }

        return;
err_ret:
        ret = -ret;
        mem_cache_free(MEM_CACHE_4K, ctx);
        func(arg, &ret);
        return;
}

static void __lichbd_read(void *arg)
{
        int ret;
        lichbd_ctx_t *ctx;

        ctx = arg;

        DBUG("read size %u offset %llu\n", (int)ctx->size, (LLU)ctx->offset);
  
        ANALYSIS_BEGIN(0);
      
retry:
        ret = lichbd_rpc_read(&ctx->buf, ctx->size, ctx->offset, ctx->localize);
        if (unlikely(ret)) {
                ret = _errno(ret);
                if (__retry__ && ctx->retry < 100 && (ret == EAGAIN || ret == ENOSPC)
                    && ((gettime() - ctx->time) < gloconf.rpc_timeout * 4)) {
                        if (ctx->retry > 10) {
                                DWARN("read "CHKID_FORMAT" (%llu, %llu),"
                                      " ret (%d) %s, need retry %u\n",
                                      CHKID_ARG(&ctx->fileid),
                                      (LLU)ctx->offset, (LLU)ctx->size,
                                      ret, strerror(ret), ctx->retry);
                        }

                        ctx->retry++;
                        schedule_sleep("lichbd_read_retry", 1000 * 1000);

                        goto retry;
                } else {
                        DWARN("read cmd(%p, %llu, %llu), ret (%d) %s\n",
                              ctx, (LLU)ctx->offset, (LLU)ctx->size,
                              ret, strerror(ret));
                        GOTO(err_ret, ret);
                }
        }

        ANALYSIS_ASSERT(0, 1000 * 1000 * (_get_rpc_timeout() * 4), NULL);
        
        YASSERT(ctx->buf.len == ctx->size);
        if (ctx->_buf) {
                mbuffer_get(&ctx->buf, ctx->_buf, ctx->buf.len);
                mbuffer_free(&ctx->buf);
        } else {
                mbuffer_merge(ctx->buffer, &ctx->buf);
        }

        ret = ctx->size;
        ctx->func(ctx->arg, &ret);
        mem_cache_free(MEM_CACHE_4K, ctx);
        
        return;
err_ret:
        ANALYSIS_ASSERT(0, 1000 * 1000 * (_get_rpc_timeout() * 4), NULL);
        ret = -ret;
        if (ctx->_buf) {
                mbuffer_free(&ctx->buf);
        }
        ctx->func(ctx->arg, &ret);
        mem_cache_free(MEM_CACHE_4K, ctx);
        return;
}

void lichbd_read(lichbd_ioctx_t *ioctx, buffer_t *buf, size_t size, off_t offset, int localize,
                 func1_t func, void *arg)
{
        int ret, retry = 0;
        lichbd_ctx_t *ctx;

#ifdef HAVE_STATIC_ASSERT
        static_assert(sizeof(*ctx)  < sizeof(mem_cache4k_t), "lichbd_ctx_t");
#endif

        DBUG("read size %u offset %llu\n", (int)size, (LLU)offset);

        ctx = mem_cache_calloc(MEM_CACHE_4K, 1);
        ctx->size = size;
        ctx->offset = offset;
        ctx->func = func;
        ctx->fileid = ioctx->fileid;
        ctx->arg = arg;
        ctx->retry = 0;
        ctx->time = gettime();
        ctx->buffer = buf;
        ctx->_buf = NULL;
        ctx->ioctx = ioctx;
        ctx->localize = localize;

        ret = mbuffer_init(&ctx->buf, 0);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

retry:
        ret = vm_request(ioctx->vm, __lichbd_read, ctx, "lichbd_read");
        if (unlikely(ret)) {
                USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
        }

        return;
err_ret:
        ret = -ret;
        mem_cache_free(MEM_CACHE_4K, ctx);
        func(arg, &ret);
        return;
}

void lichbd_read1(lichbd_ioctx_t *ioctx, char *buf, size_t size, off_t offset, int localize,
                func1_t func, void *arg)
{
        int ret, retry = 0;
        lichbd_ctx_t *ctx;

#ifdef HAVE_STATIC_ASSERT
        static_assert(sizeof(*ctx)  < sizeof(mem_cache4k_t), "lichbd_ctx_t");
#endif

        DBUG("read size %u offset %llu\n", (int)size, (LLU)offset);

        ctx = mem_cache_calloc(MEM_CACHE_4K, 1);
        ctx->size = size;
        ctx->offset = offset;
        ctx->func = func;
        ctx->fileid = ioctx->fileid;
        ctx->arg = arg;
        ctx->retry = 0;
        ctx->time = gettime();
        ctx->_buf = buf;
        ctx->ioctx = ioctx;
        ctx->localize = localize;

        ret = mbuffer_init(&ctx->buf, 0);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

retry:
        ret = vm_request(ioctx->vm, __lichbd_read, ctx, "lichbd_read1");
        if (unlikely(ret)) {
                DINFO("retry %u\n", retry);
                if (retry > 10)
                        GOTO(err_ret, ret);

                sleep(1);
                retry++;
                goto retry;
        }

        return;
err_ret:
        ret = -ret;
        mem_cache_free(MEM_CACHE_4K, ctx);
        func(arg, &ret);
        return;
}

static void __lichbd_snap_read(void *arg)
{
        int ret;
        lichbd_ctx_t *ctx;

        ctx = arg;

        DBUG("read size %u offset %llu\n", (int)ctx->size, (LLU)ctx->offset);

retry:
        ret = lichbd_rpc_snap_read(&ctx->snapid, &ctx->buf, ctx->size, ctx->offset);
        if (unlikely(ret)) {
                ret = _errno(ret);
                if (__retry__ && ctx->retry < 100 && (ret == EAGAIN || ret == ENOSPC)
                    && ((gettime() - ctx->time) < 120)) {
                        if (ctx->retry > 10) {
                                DINFO("read "CHKID_FORMAT" (%llu, %llu),"
                                      " ret (%d) %s, need retry %u\n",
                                      CHKID_ARG(&ctx->fileid),
                                      (LLU)ctx->offset, (LLU)ctx->size,
                                      ret, strerror(ret), ctx->retry);
                        }

                        ctx->retry++;
                        schedule_sleep("lichbd_snap_read_retry", 1000 * 1000);

                        goto retry;
                } else {
                        DWARN("read cmd(%p, %llu, %llu), ret (%d) %s\n",
                              ctx, (LLU)ctx->offset, (LLU)ctx->size,
                              ret, strerror(ret));
                        GOTO(err_ret, ret);
                }
        }

        YASSERT(ctx->buf.len == ctx->size);
        mbuffer_get(&ctx->buf, ctx->_buf, ctx->buf.len);

        ret = ctx->size;
        mbuffer_free(&ctx->buf);
        ctx->func(ctx->arg, &ret);
        mem_cache_free(MEM_CACHE_4K, ctx);

        return;
err_ret:
        ret = -ret;
        mbuffer_free(&ctx->buf);
        ctx->func(ctx->arg, &ret);
        mem_cache_free(MEM_CACHE_4K, ctx);
        return;
}

void lichbd_snap_read(lichbd_ioctx_t *ioctx, const fileid_t *snapid, char *buf, size_t size, off_t offset,
                        func1_t func, void *arg)
{
        int ret, retry = 0;
        lichbd_ctx_t *ctx;

#ifdef HAVE_STATIC_ASSERT
        static_assert(sizeof(*ctx)  < sizeof(mem_cache4k_t), "lichbd_ctx_t");
#endif

        DBUG("read size %u offset %llu\n", (int)size, (LLU)offset);

        ctx = mem_cache_calloc(MEM_CACHE_4K, 1);
        ctx->size = size;
        ctx->snapid = *snapid;
        ctx->offset = offset;
        ctx->func = func;
        ctx->arg = arg;
        ctx->retry = 0;
        ctx->time = gettime();
        ctx->_buf = buf;
        ctx->ioctx = ioctx;

        ret = mbuffer_init(&ctx->buf, 0);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

retry:
        ret = vm_request(ioctx->vm, __lichbd_snap_read, ctx, "lichbd_snap_read");
        if (unlikely(ret)) {
                DINFO("retry %u\n", retry);
                USLEEP_RETRY(err_ret, ret, retry, retry, 10, (1000 * 1000));
        }

        return;
err_ret:
        ret = -ret;
        mem_cache_free(MEM_CACHE_4K, ctx);
        func(arg, &ret);
        return;
}

static int __lichbd_rpc_reconnect(int *sd, void *_ctx)
{
        int ret, retry = 0;
        rpc_vm_ctx_t *ctx = _ctx;
        time_t t;

        rpc_table_reset(__rpc_table__, &ctx->sockid, NULL);

        ANALYSIS_BEGIN(0);

        t = gettime();
        while (1) {
                ret = lichbd_rpc_connect(&ctx->sockid, ctx->pool, ctx->path);
                if (unlikely(ret)) {
                        ret = _errno_net(ret);
                        DWARN("conn fail retry %d (%u) %s\n", retry, ret, strerror(ret));
                        if ((ret == EAGAIN || ret == ENONET)
                            && retry < _get_rpc_timeout() / 2
                            && gettime() - t < _get_rpc_timeout() / 2) {
                                sleep(1);
                                retry++;
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }

                break;
        }

        *sd = ctx->sockid.sd;

        ANALYSIS_ASSERT(0, 1000 * 1000 * (_get_rpc_timeout() * 3), NULL);
        
        return 0;
err_ret:
        ANALYSIS_ASSERT(0, 1000 * 1000 * (_get_rpc_timeout() * 3), NULL);
        return ret;
}

int lichbd_check()
{
        DBUG("lichbd check\n");

        lichbd_rpc_check();

        return 0;
}

static int __lichbd_connect(const char *pool, const char *path, lichbd_ioctx_t *ioctx)
{
        int ret, retry;
        vm_t *vm;
        vm_op_t vm_op;
        rpc_vm_ctx_t *ctx;
        sockid_t sockid;

        DBUG("connect to %s\n", path);

        retry = 0;
        while (1) {
                ret = lichbd_rpc_connect(&sockid, pool, path);
                if (unlikely(ret)) {
                        ret = _errno_net(ret);
                        if (retry > 30)
                                DWARN("conn fail retry %d (%u) %s\n", retry, ret, strerror(ret));
                        if ((ret == EAGAIN || ret == ENONET) && retry < 600) {
                                sleep(1);
                                retry++;
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }

                break;
        }

        ret = ymalloc((void **)&ctx, sizeof(*ctx) + strlen(path) + 1);
        if (unlikely(ret))
                GOTO(err_sd, ret);

        strcpy(ctx->path, path);
        strcpy(ctx->pool, pool);
        memset(&vm_op, 0x0, sizeof(vm_op));
        vm_op.name = "lichbd_rpc";
        vm_op.sd = sockid.sd;
        vm_op.init = lichbd_rpc_init;
        vm_op.exec = lichbd_rpc_reply;
        vm_op.check = lichbd_check;
        vm_op.reconnect = __lichbd_rpc_reconnect;
        vm_op.ctx = ctx;

        ret = vm_create(&vm_op, &vm);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ioctx->vm = vm;

        return 0;
err_free:
        yfree((void **)&ctx);
err_sd:
        close(sockid.sd);
err_ret:
        return ret;
}

int lichbd_connect(const char *pool, const char *name, lichbd_ioctx_t *ioctx, int flag)
{
        int ret;
        fileid_t fileid;
        struct stat stbuf;
        char path[MAX_NAME_LEN];

        DBUG("connect to %s\n", name);

        ret = lichbd_open(pool, name, &fileid);
        if (unlikely(ret)) {
                if (ret == ENOENT && (flag & O_CREAT)) {
                        ret = lichbd_create(pool, name, &fileid);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else
                        GOTO(err_ret, ret);
        }

        ret = stor_getattr(pool, &fileid, &stbuf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ioctx->fileid = fileid;

        __lichbd_path(name, path);

        ret = __lichbd_connect(pool, path, ioctx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

void lichbd_disconnect(lichbd_ioctx_t *ioctx)
{
        vm_stop(ioctx->vm);
}

typedef struct {
        sem_t sem;
        sy_rwlock_t lock;
        int retval;
} arg_t;

static void __lichbd_pread(void *_arg, void *retval)
{
        arg_t *arg = _arg;

        arg->retval = *(int *)retval;

        sy_rwlock_unlock(&arg->lock);
}

int lichbd_pread(lichbd_ioctx_t *ioctx, buffer_t *buf, size_t size, off_t offset, int localize)
{
        int ret;
        arg_t arg;

        ret = sy_rwlock_init(&arg.lock, "lochbd_pread");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = sy_rwlock_wrlock(&arg.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        lichbd_read(ioctx, buf, size, offset, localize, __lichbd_pread, &arg);

        ret = sy_rwlock_wrlock(&arg.lock);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        sy_rwlock_unlock(&arg.lock);
        
        ret = arg.retval;
        if (ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        return 0;
err_lock:
        sy_rwlock_unlock(&arg.lock);
err_ret:
        return ret;
}

int lichbd_pread1(lichbd_ioctx_t *ioctx, char *_buf, size_t size, off_t offset, int localize)
{
        int ret, len;
        buffer_t buf;

        mbuffer_init(&buf, 0);

        ret = lichbd_pread(ioctx, &buf, size, offset, localize);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        len = buf.len;
        mbuffer_get(&buf, _buf, buf.len);
        mbuffer_free(&buf);
        
        return len;
err_ret:
        return -ret;
}

static void __lichbd_pwrite(void *_arg, void *retval)
{
        arg_t *arg = _arg;

        arg->retval = *(int *)retval;

        sy_rwlock_unlock(&arg->lock);
}

int lichbd_pwrite(lichbd_ioctx_t *ioctx, const buffer_t *buf, size_t size, off_t offset, int flag)
{
        int ret;
        arg_t arg;

        DINFO("lichbd write\n");

        ret = sy_rwlock_init(&arg.lock, "lochbd_pwrite");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = sy_rwlock_wrlock(&arg.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        lichbd_write(ioctx, buf, size, offset, flag, __lichbd_pwrite, &arg);

        ret = sy_rwlock_wrlock(&arg.lock);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        sy_rwlock_unlock(&arg.lock);

        ret = arg.retval;
        if (ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        return 0;
err_lock:
        sy_rwlock_unlock(&arg.lock);
err_ret:
        return ret;
}

int lichbd_pwrite1(lichbd_ioctx_t *ioctx, const char *_buf, size_t size, off_t offset, int flag)
{
        int ret;
        buffer_t buf;

        ret = mbuffer_init(&buf, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mbuffer_copy(&buf, _buf, size);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        ret = lichbd_pwrite(ioctx, &buf, size, offset, flag);
        if (unlikely(ret))
                GOTO(err_free, ret);

        mbuffer_free(&buf);

        return 0;
err_free:
        mbuffer_free(&buf);
err_ret:
        return ret;
}

static void __lichbd_snap_pread(void *_arg, void *retval)
{
        arg_t *arg = _arg;

        arg->retval = *(int *)retval;

        sem_post(&arg->sem);
}

int lichbd_snap_pread(lichbd_ioctx_t *ioctx, const fileid_t *snapid, char *buf, size_t size, off_t offset)
{
        int ret;
        arg_t arg;

        ret = sem_init(&arg.sem, 0, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        lichbd_snap_read(ioctx, snapid, buf, size, offset, __lichbd_snap_pread, &arg);

        ret = sem_wait(&arg.sem);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = arg.retval;
        if (ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        return ret;
err_ret:
        return -ret;
}

static void __lichbd_snap_diff_in(void *arg)
{
        int ret;
        lichbd_ctx_t *ctx;

        ctx = arg;

        DBUG("read size %u offset %llu\n", (int)ctx->size, (LLU)ctx->offset);

retry:
        ret = lichbd_rpc_snap_diff(&ctx->snapid, &ctx->snapdst, &ctx->buf, ctx->size, ctx->offset);
        if (unlikely(ret)) {
                ret = _errno(ret);
                if (__retry__ && ctx->retry < 100 && (ret == EAGAIN || ret == ENOSPC)
                    && ((gettime() - ctx->time) < 120)) {
                        if (ctx->retry > 10) {
                                DINFO("read "CHKID_FORMAT" (%llu, %llu),"
                                      " ret (%d) %s, need retry %u\n",
                                      CHKID_ARG(&ctx->fileid),
                                      (LLU)ctx->offset, (LLU)ctx->size,
                                      ret, strerror(ret), ctx->retry);
                        }

                        ctx->retry++;
                        schedule_sleep("lichbd_snap_diff_retry", 1000 * 1000);

                        goto retry;
                } else {
                        DINFO("read cmd(%p, %llu, %llu), ret (%d) %s\n",
                              ctx, (LLU)ctx->offset, (LLU)ctx->size,
                              ret, strerror(ret));
                        GOTO(err_ret, ret);
                }
        }

        YASSERT(ctx->buf.len == ctx->size);
        mbuffer_get(&ctx->buf, ctx->_buf, ctx->buf.len);

        ret = ctx->size;
        mbuffer_free(&ctx->buf);
        ctx->func(ctx->arg, &ret);
        mem_cache_free(MEM_CACHE_4K, ctx);

        return;
err_ret:
        ret = -ret;
        mbuffer_free(&ctx->buf);
        ctx->func(ctx->arg, &ret);
        mem_cache_free(MEM_CACHE_4K, ctx);
        return;
}

void lichbd_snap_diff_in(lichbd_ioctx_t *ioctx, const fileid_t *snapsrc,
                const fileid_t *snapdst, char *buf, size_t size, off_t offset, func1_t func, void *arg)
{
        int ret, retry = 0;
        lichbd_ctx_t *ctx;

#ifdef HAVE_STATIC_ASSERT
        static_assert(sizeof(*ctx)  < sizeof(mem_cache4k_t), "lichbd_ctx_t");
#endif

        DBUG("read size %u offset %llu\n", (int)size, (LLU)offset);

        ctx = mem_cache_calloc(MEM_CACHE_4K, 1);
        ctx->size = size;
        ctx->snapid = *snapsrc;
        ctx->snapdst = *snapdst;
        ctx->offset = offset;
        ctx->func = func;
        ctx->arg = arg;
        ctx->retry = 0;
        ctx->time = gettime();
        ctx->_buf = buf;
        ctx->ioctx = ioctx;

        ret = mbuffer_init(&ctx->buf, 0);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

retry:
        ret = vm_request(ioctx->vm, __lichbd_snap_diff_in, ctx, "lichbd_snap_diff");
        if (unlikely(ret)) {
                DINFO("retry %u\n", retry);
                //USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                if (retry > 10)
                        GOTO(err_ret, ret);

                sleep(1);
                retry++;
                goto retry;
        }

        return;
err_ret:
        ret = -ret;
        mem_cache_free(MEM_CACHE_4K, ctx);
        func(arg, &ret);
        return;
}

static void __lichbd_snap_diff(void *_arg, void *retval)
{
        arg_t *arg = _arg;

        arg->retval = *(int *)retval;

        sem_post(&arg->sem);
}

int lichbd_snap_diff(lichbd_ioctx_t *ioctx, const fileid_t *snapsrc,
                const fileid_t *snapdst, char *buf, size_t size, off_t offset)
{
        int ret;
        arg_t arg;

        ret = sem_init(&arg.sem, 0, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        lichbd_snap_diff_in(ioctx, snapsrc, snapdst, buf, size, offset, __lichbd_snap_diff, &arg);

        ret = sem_wait(&arg.sem);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = arg.retval;
        if (ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        return ret;
err_ret:
        return -ret;
}
